package com.iteaj.iot.server.manager;

import com.iteaj.iot.*;
import com.iteaj.iot.client.UnWritableProtocolException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.PlatformDependent;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ConcurrentMap;

/**
 * <p>通过客户端编号获取以客户端关联的链接信息{@link Channel}</p>
 *  映射关系: key：为设备编号   value：{@link Channel}
 * Create Date By 2020-09-12
 * @author iteaj
 * @since 1.8
 */
public class TcpDeviceManager extends DefaultChannelGroup implements ChannelManager {

    /**
     * 存储客户端编号和对应的连接
     */
    private final EventExecutor executor;
    private static Logger logger = LoggerFactory.getLogger(TcpDeviceManager.class);
    /**
     * 已注册客户端编号的通道
     */
    private final ConcurrentMap<String, Channel> registerChannels = PlatformDependent.newConcurrentHashMap();

    public TcpDeviceManager(String name, EventExecutor executor) {
        super(name, executor);
        this.executor = executor;
    }

    public TcpDeviceManager(String name, EventExecutor executor, boolean stayClosed) {
        super(name, executor, stayClosed);
        this.executor = executor;
    }

    @Override
    public Channel add(String equipCode, Channel channel) {
        this.add(channel);
        return registerChannels.put(equipCode, channel);
    }

    @Override
    public int useSize() {
        return registerChannels.size();
    }

    @Override
    public Collection<Channel> devices() {
        return Collections.unmodifiableCollection(registerChannels.values());
    }

    @Override
    public Channel find(String equipCode) {
        return registerChannels.get(equipCode);
    }

    @Override
    public Channel remove(String equipCode) {
        Channel remove = registerChannels.get(equipCode);
        if(remove != null) {
            remove((Object) equipCode);
        }

        return remove;
    }

    /**
     * 关闭指定的设备
     * @param equipCode
     * @return
     */
    @Override
    public boolean close(String equipCode) {
        Channel channel = find(equipCode);
        if(channel != null) {
            if(channel.isActive()) {
                return channel.close().syncUninterruptibly().isSuccess();
            } else {
                return true;
            }
        }

        return true;
    }

    @Override
    public boolean isClose(String equipCode) {
        Channel channel = find(equipCode);
        if(channel == null) {
            return true;
        } else {
            return !channel.isActive();
        }
    }

    @Override
    public boolean isEmpty() {
        return registerChannels.isEmpty();
    }

    @Override
    public boolean contains(Object o) {
        if(o instanceof String) {
            return registerChannels.containsKey(o);
        }

        return super.contains(o);
    }

    @Override
    public boolean remove(Object o) {
        if(o instanceof Channel) {
            // 移除已经注册的客户端
            String deviceSn = ((Channel) o).attr(CoreConst.EQUIP_CODE).get();
            if(deviceSn instanceof String) {
                // 查找要移除的设备的连接
                Channel channel = this.find(deviceSn);
                // o是Channel类型, 则此次移除有可能是早期设备的遗留连接
                // 如果是同一个连接则直接移除
                // https://gitee.com/iteaj/iot/issues/I7V94U
                if(channel == o) {
                    registerChannels.remove(deviceSn);
                } else if(logger.isTraceEnabled()) {
                    logger.trace("客户端管理({}) 移除非活动客户端 - 客户端编号: {}", name(), deviceSn);
                }
            }
        } else if(o instanceof String) {
            o = registerChannels.remove(o);
        } else {
            throw new IllegalArgumentException("只支持使用[String or Channel]移除客户端");
        }

        boolean remove = super.remove(o);
        if(logger.isDebugEnabled()) {
            logger.debug("客户端管理({}) 客户端统计 - 总连接数: {} - 总注册数: {}"
                    , name(), size(), registerChannels.size());
        }

        return remove;
    }

    @Override
    public void clear() {
        super.clear();
        registerChannels.clear();
    }

    @Override
    public Optional<ChannelFuture> writeAndFlush(String equipCode, Object msg, Object... args) {
        if(msg instanceof Protocol) {
            return this.writeAndFlush(equipCode, (Protocol) msg);
        } else {
            if(StringUtils.isBlank(equipCode)) {
                throw new IllegalArgumentException("设备编号不能为空");
            }

            if(null == msg) {
                throw new IllegalArgumentException("请传入要发送的协议报文");
            }

            Channel channel = find(equipCode);
            if(null == channel) {
                return Optional.empty();
            } else if(!channel.isActive()) { // 设备已经取消注册, 删除设备
                remove(equipCode);
                return Optional.of(channel.newFailedFuture(NotDeviceException.DEFAULT));
            } else if(!channel.isWritable()) {
                return Optional.of(channel.newFailedFuture(new UnWritableProtocolException(msg
                        , channel.bytesBeforeWritable(), channel.bytesBeforeUnwritable())));
            }

            return Optional.of(channel.writeAndFlush(msg));
        }
    }

    @Override
    public Optional<ChannelFuture> writeAndFlush(String equipCode, Protocol protocol) {
        if(StringUtils.isBlank(equipCode)) {
            throw new IllegalArgumentException("设备编号不能为空");
        }

        if(null == protocol) {
            throw new IllegalArgumentException("请传入要发送的协议报文");
        }

        Channel channel = find(equipCode);
        if(null == channel) {
            return Optional.empty();
        } else if(!channel.isActive()) { // 设备已经取消注册, 删除设备
            remove(equipCode);
            logger.warn("设备在线管理({}) 设备断线 - 设备编号: {} - 已连接：{} - 已注册：{} - 协议: {}"
                    , this.name(), equipCode, this.size(), this.useSize(), protocol.protocolType());
            return Optional.of(channel.newFailedFuture(NotDeviceException.DEFAULT));
        } else if(!channel.isWritable()) {
            return Optional.of(channel.newFailedFuture(new UnWritableProtocolException(protocol
                    , channel.bytesBeforeWritable(), channel.bytesBeforeUnwritable())));
        }

        return Optional.of(channel.writeAndFlush(protocol));
    }

    protected EventExecutor getExecutor() {
        return executor;
    }

}
